Skip to content

Add Anthropic provider (apache-airflow-providers-anthropic)#69003

Open
kaxil wants to merge 1 commit into
apache:mainfrom
astronomer:add-anthropic-provider
Open

Add Anthropic provider (apache-airflow-providers-anthropic)#69003
kaxil wants to merge 1 commit into
apache:mainfrom
astronomer:add-anthropic-provider

Conversation

@kaxil

@kaxil kaxil commented Jun 26, 2026

Copy link
Copy Markdown
Member

New provider apache-airflow-providers-anthropic for the Anthropic Claude API, so Dag authors call Claude through the official Anthropic Python SDK instead of embedding SDK calls in task code.

It ships:

  • AnthropicHook — builds the right client for the configured platform: first-party API, Amazon Bedrock, Google Vertex AI, Claude Platform on AWS, and Microsoft Foundry. Supports keyless Workload Identity Federation (configured on the connection or resolved from the environment).
  • AnthropicBatchOperator (deferrable) + AnthropicBatchSensor + AnthropicBatchTrigger — the Message Batches API (submit → defer → collect).
  • AnthropicAgentSessionOperator (deferrable) + AnthropicAgentSessionTrigger — Managed Agents sessions (message and outcome runs).
  • Connection form fields (platform, default model, cloud region/resource) render as labelled inputs via declarative conn-fields, not raw Extra JSON.
image image image

Design rationale

  • Separate provider, not part of common.ai. common.ai is a provider-agnostic abstraction; this provider exposes Anthropic-specific surfaces (Message Batches, Managed Agents, token counting, the platform clients) that do not fit a neutral interface. Users who want a provider-agnostic layer keep using common.ai; users who want direct Claude/SDK features use this one.
  • Batch-centric operators. A one-shot message is a single hook call; the operator-level value is the asynchronous Message Batches workflow, which is what the operator/trigger/sensor model.
  • Airflow 3+ only. Airflow 2 is EOL; the provider uses the Task SDK execution model and the AF3 connection-form metadata.
  • Deferrable and synchronous paths stay symmetric. Deferrable mode releases the worker slot during long batches/sessions; deferrable=False mirrors the same cancel-on-timeout (batch) / archive-on-timeout (session) teardown so neither mode leaks a server-side resource.
  • model on the connection. The default model is read from extra['model'] so it changes without editing DAGs; it falls back to claude-opus-4-8.

Notes / gotchas

  • First-party-only endpoints (Message Batches, token counting, the Models API) are gated to platform in {anthropic, aws}; calling them on Bedrock/Vertex/Foundry raises a clear error rather than a raw 404.
  • Ships as state: not-ready / lifecycle: incubation while the Managed Agents beta surface stabilises.
  • Floors anthropic>=0.101.0 (introduced AnthropicAWS); verified against 0.109.x.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a static code review, I am taking it for a spin now to see how it goes.

Comment thread providers/anthropic/src/airflow/providers/anthropic/operators/anthropic.py Outdated
Comment thread providers/anthropic/tests/unit/anthropic/operators/test_agent.py Outdated
Comment thread providers/anthropic/docs/connections.rst Outdated
Comment thread providers/anthropic/docs/connections.rst
Comment thread providers/anthropic/docs/index.rst
ENDED = "ended"

@classmethod
def is_in_progress(cls, status: str) -> bool:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, but I think it's a bit confusing.

in_process is in_process, which is expected, but canceling also is in_process. It's not incorrect, though it does make the meaning of in_process less clear.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. canceling is non-terminal (we poll until ended), so it returns True too, which reads oddly next to the in_progress enum value. The method really means "not yet terminal"; I kept the name to mirror the SDK's processing_status vocabulary. Happy to rename it to is_not_terminal (or add a docstring note) if you'd prefer, let me know.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistency sounds like a better way to go. But let's extend the docstring/comment here so we can avoid confusion as much as possible. Thanks

Comment thread providers/anthropic/src/airflow/providers/anthropic/hooks/anthropic.py Outdated
Comment thread providers/anthropic/tests/system/anthropic/example_anthropic_agent.py Outdated
Comment thread providers/anthropic/tests/system/anthropic/example_anthropic_batch.py Outdated
Comment thread providers/anthropic/tests/system/anthropic/example_anthropic_agent.py Outdated
@kaxil kaxil force-pushed the add-anthropic-provider branch from b64bf04 to 260a157 Compare June 26, 2026 12:10
Add a dedicated provider for the Anthropic Claude API so Dag authors can use
Claude through the official Anthropic Python SDK instead of hand-writing SDK
calls in tasks. It offers first-class connection management (first-party API,
Amazon Bedrock, Google Vertex AI, Claude Platform on AWS, Microsoft Foundry,
and keyless Workload Identity Federation) with labelled connection form fields,
plus deferrable Message Batches and Managed Agents session operators, a batch
sensor, and matching triggers.

Targets Airflow 3+ and ships as not-ready / incubation while the Managed Agents
beta surface stabilises.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants